/*
 * Copyright 2024 CloudWeGo Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package parent

import (
	"context"
	"fmt"

	"github.com/cloudwego/eino/components/document"
	"github.com/cloudwego/eino/components/indexer"
	"github.com/cloudwego/eino/schema"
)

type Config struct {
	// Indexer is the underlying indexer implementation that handles the actual document indexing.
	// For example: a vector database indexer like Milvus, or a full-text search indexer like Elasticsearch.
	Indexer indexer.Indexer

	// Transformer processes documents before indexing, typically splitting them into smaller chunks.
	// Each sub-document generated by the transformer must retain its parent document's ID.
	// For example: if a document with ID "doc_1" is split into 3 chunks, all chunks will initially
	// have ID "doc_1". These IDs will later be modified by the SubIDGenerator.
	//
	// Example transformations:
	// - A text splitter that breaks down large documents into paragraphs
	// - A code splitter that separates code files into functions
	Transformer document.Transformer

	// ParentIDKey specifies the metadata key used to store the original document's ID in each sub-document.
	// For example: if ParentIDKey is "parent_id", each sub-document will have metadata like:
	// {"parent_id": "original_doc_123"}
	ParentIDKey string

	// SubIDGenerator generates unique IDs for sub-documents based on their parent document ID.
	// For example: if parent ID is "doc_1" and we need 3 sub-document IDs, it might generate:
	// ["doc_1_chunk_1", "doc_1_chunk_2", "doc_1_chunk_3"]
	//
	// Parameters:
	//   - ctx: context for the operation
	//   - parentID: the ID of the parent document
	//   - num: number of sub-document IDs needed
	// Returns:
	//   - []string: slice of generated sub-document IDs
	//   - error: any error encountered during ID generation
	SubIDGenerator func(ctx context.Context, parentID string, num int) ([]string, error)
}

// NewIndexer creates a new parent indexer that handles document splitting and sub-document management.
//
// Parameters:
//   - ctx: context for the operation
//   - config: configuration for the parent indexer
//
// Example usage:
//
//	indexer, err := NewIndexer(ctx, &Config{
//	    Indexer: milvusIndexer,
//	    Transformer: textSplitter,
//	    ParentIDKey: "source_doc_id",
//	    SubIDGenerator: func(ctx context.Context, parentID string, num int) ([]string, error) {
//	        ids := make([]string, num)
//	        for i := 0; i < num; i++ {
//	            ids[i] = fmt.Sprintf("%s_chunk_%d", parentID, i+1)
//	        }
//	        return ids, nil
//	    },
//	})
//
// Returns:
//   - indexer.Indexer: the created parent indexer
//   - error: any error encountered during creation
func NewIndexer(ctx context.Context, config *Config) (indexer.Indexer, error) {
	if config.Indexer == nil {
		return nil, fmt.Errorf("indexer is empty")
	}
	if config.Transformer == nil {
		return nil, fmt.Errorf("transformer is empty")
	}
	if config.SubIDGenerator == nil {
		return nil, fmt.Errorf("sub id generator is empty")
	}

	return &parentIndexer{
		indexer:        config.Indexer,
		transformer:    config.Transformer,
		parentIDKey:    config.ParentIDKey,
		subIDGenerator: config.SubIDGenerator,
	}, nil
}

type parentIndexer struct {
	indexer        indexer.Indexer
	transformer    document.Transformer
	parentIDKey    string
	subIDGenerator func(ctx context.Context, parentID string, num int) ([]string, error)
}

func (p *parentIndexer) Store(ctx context.Context, docs []*schema.Document, opts ...indexer.Option) ([]string, error) {
	subDocs, err := p.transformer.Transform(ctx, docs)
	if err != nil {
		return nil, fmt.Errorf("transform docs fail: %w", err)
	}
	if len(subDocs) == 0 {
		return nil, fmt.Errorf("doc transformer returned no documents")
	}
	currentID := subDocs[0].ID
	startIdx := 0
	for i, subDoc := range subDocs {
		if subDoc.MetaData == nil {
			subDoc.MetaData = make(map[string]interface{})
		}
		subDoc.MetaData[p.parentIDKey] = subDoc.ID

		if subDoc.ID == currentID {
			continue
		}

		// generate new doc id
		subIDs, err := p.subIDGenerator(ctx, subDocs[startIdx].ID, i-startIdx)
		if err != nil {
			return nil, err
		}
		if len(subIDs) != i-startIdx {
			return nil, fmt.Errorf("generated sub IDs' num is unexpected")
		}
		for j := startIdx; j < i; j++ {
			subDocs[j].ID = subIDs[j-startIdx]
		}
		startIdx = i
		currentID = subDoc.ID
	}
	// generate new doc id
	subIDs, err := p.subIDGenerator(ctx, subDocs[startIdx].ID, len(subDocs)-startIdx)
	if err != nil {
		return nil, err
	}
	if len(subIDs) != len(subDocs)-startIdx {
		return nil, fmt.Errorf("generated sub IDs' num is unexpected")
	}
	for j := startIdx; j < len(subDocs); j++ {
		subDocs[j].ID = subIDs[j-startIdx]
	}

	return p.indexer.Store(ctx, subDocs, opts...)
}
